e9f3506073f7784d2341d939d2ed372b7d56c88d,conduit-worker/src/main/java/com/inmobi/conduit/purge/DataPurgerService.java,DataPurgerService,getStreamsPathToPurge,#Map#boolean#,307

Before Change


      String streamName = entry.getKey();
      Path streamRootPath = entry.getValue();
      String tableName = null;
      if (isLocal) {
        tableName = LOCAL_TABLE_PREFIX + "_" + streamName;
      } else {
        tableName = TABLE_PREFIX + "_" + streamName;
      }
      LOG.debug("Find Paths to purge for stream [" + streamName
          + "] streamRootPath [" + streamRootPath + "]");
      // For each Stream, all years
      FileStatus[] years = getAllFilesInDir(streamRootPath, fs);
      if (years != null) {
        for (FileStatus year : years) {
          String yearVal = year.getPath().getName();
          // For each month
          FileStatus[] months = getAllFilesInDir(year.getPath(), fs);
          if (months != null && months.length >= 1) {
            for (FileStatus month : months) {
              String monthVal = month.getPath().getName();
              // For each day
              FileStatus[] days = getAllFilesInDir(month.getPath(), fs);
              if (days != null && days.length >= 1) {
                for (FileStatus day : days) {
                  String dayVal = day.getPath().getName();
                  // For each day
                  FileStatus[] hours = getAllFilesInDir(day.getPath(), fs);
                  if (hours != null && hours.length >= 1) {
                    for (FileStatus hour : hours) {
                      LOG.debug("Working for hour [" + hour.getPath() + "]");

                      String hourVal = hour.getPath().getName();
                      Calendar streamDate = CalendarHelper.getDateHour(yearVal,
                          monthVal, dayVal, hourVal);
                      LOG.debug("Validate [" + streamDate.toString()
                          + "] against retentionHours ["
                          + getRetentionPeriod(streamName) + "]");
                      if (isPurge(streamDate, getRetentionPeriod(streamName))) {
                        LOG.debug("Adding stream to purge [" + hour.getPath()
                            + "]");
                        Path hourPath = hour.getPath().makeQualified(fs);
                        addPartitionToList(streamName, tableName, hourPath, yearVal,
                            monthVal, dayVal, hourVal);
                        streamsToPurge.add(hourPath);
                      }
                    }
                  } else {

After Change


                      if (isPurge(streamDate, getRetentionPeriod(streamName))) {
                        LOG.debug("Adding stream to purge [" + hour.getPath()
                            + "]");
                        streamsToPurge.add(hour.getPath().makeQualified(fs));
                      }
                    }
                  } else {